#include "config.h"

#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>

#define DBG_SUBSYS S_LIBSTORAGE

#include "limits.h"
#include "adt.h"
#include "ynet_rpc.h"
#include "sysy_lib.h"
#include "cluster.h"
#include "chunk.h"
#include "bmap.h"
#include "metadata.h"
#include "../../ynet/sock/sock_tcp.h"
#include "rpc_proto.h"
#include "net_table.h"
#include "configure.h"
#include "net_global.h"
#include "lich_md.h"
#include "squeue.h"
#include "cache.h"
#include "prof.h"
#include "ylog.h"
#include "vm.h"
#include "dbg.h"

#define MSG_SIZE 64

#define __prof_vm_port__ 27903

typedef struct {
        sem_t sem;
        uint64_t io;
        int sd;
        int sendsize;
        int recvsize;
} prof_net_arg_t;

static int __listen_sd__;
static int __running__ = 0;

inline static void __prof_vm_exec(void *arg)
{
        int ret;
        rpc_request_t *rpc_request = arg;
        char buf[MAX_BUF_LEN];

        ret = send(rpc_request->sockid.sd, buf, MSG_SIZE, MSG_DONTWAIT);
        if (ret < 0) {
                DINFO("send fail\n");
        }

        DBUG("exec\n");

        mbuffer_free(&rpc_request->buf);
        mem_cache_free(MEM_CACHE_64, rpc_request);
}

static int __prof_vm_newtask(int *_count)
{
        int sd, count = 0;
        buffer_t *buf;
        rpc_request_t *rpc_request;
        vm_t *vm = vm_self();

        sd = vm->sd;
        buf = &vm->recv_buf;

        DBUG("new task, sd %u buflen %u\n", sd, buf->len);
        while (buf->len >= MSG_SIZE) {
                count++;
                //DBUG("new task, len %u\n", head.len + head.blocks);
                rpc_request = mem_cache_calloc(MEM_CACHE_64, 0);
                mbuffer_init(&rpc_request->buf, 0);
                mbuffer_pop(buf, &rpc_request->buf, MSG_SIZE);
                rpc_request->sockid.sd = sd;

                schedule_task_new("prof_task", __prof_vm_exec, rpc_request, -1);
        }

        *_count = count;

        return 0;
}

static int __prof_vm_accept__()
{
        int ret, sd;
        struct sockaddr_in sin;
        socklen_t alen;
        vm_op_t vm_op;

        _memset(&sin, 0, sizeof(sin));
        alen = sizeof(struct sockaddr_in);

        sd = accept(__listen_sd__, &sin, &alen);
        if (sd < 0 ) {
                ret = errno;
		GOTO(err_ret, ret);
        }

        ret = tcp_sock_tuning(sd, 1, 1);
        if (unlikely(ret))
                GOTO(err_sd, ret);

        DINFO("accept new sd %u\n", sd);

        memset(&vm_op, 0x0, sizeof(vm_op));
        vm_op.name = "prof_vm";
        vm_op.sd = sd;
        vm_op.exec = __prof_vm_newtask;

        ret = vm_create(&vm_op, NULL);
        if (unlikely(ret))
                GOTO(err_sd, ret);

        return 0;
err_sd:
        (void) close(sd);
err_ret:
        return ret;
}

static void *__prof_vm_accept(void *_arg)
{
        int ret;

        (void) _arg;

        while (1) {
                ret = sock_poll_sd(__listen_sd__, 1000 * 1000, POLLIN);
                if (unlikely(ret)) {
                        if (ret == ETIMEDOUT || ret == ETIME)
                                continue;
                        else
                                GOTO(err_ret, ret);
                 }

                ret = __prof_vm_accept__();
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return NULL;
err_ret:
        return NULL;
}

int prof_vm_init()
{
        int ret;
        pthread_t th;
        pthread_attr_t ta;
        char port[MAX_NAME_LEN];

        snprintf(port, MAX_NAME_LEN, "%d", __prof_vm_port__);
        ret = tcp_sock_hostlisten(&__listen_sd__, NULL, port,
                                  YNET_QLEN, YNET_RPC_BLOCK, 1);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);
 
        ret = pthread_create(&th, &ta, __prof_vm_accept, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static void *__prof_vm_cli_worker(void *_arg)
{
        int ret, pipe_in[2], pipe_out[2];
        prof_net_arg_t *arg = _arg;
        char *in = NULL, *out = NULL;

#if 1
        ret = posix_memalign((void **)&in, 4096, MAX_BUF_LEN);
        if (ret < 0) {
                ret = ENOMEM;
                UNIMPLEMENTED(__DUMP__);
        }

        ret = posix_memalign((void **)&out, 4096, MAX_BUF_LEN);
        if (ret < 0) {
                ret = ENOMEM;
                UNIMPLEMENTED(__DUMP__);
        }
#endif

        ret = pipe(pipe_in);
        if (ret < 0) {
                ret = errno;
                UNIMPLEMENTED(__DUMP__);
        }

        ret = pipe(pipe_out);
        if (ret < 0) {
                ret = errno;
                UNIMPLEMENTED(__DUMP__);
        }

        while (__running__) {
                strcpy(in, "xxxxxxxxx");
#ifndef USE_SPLICE
                ret = send(arg->sd, in, MSG_SIZE, MSG_DONTWAIT);
                if (ret < 0) {
                        ret = errno;
                        UNIMPLEMENTED(__DUMP__);
                }
#else
                ret = __splice_out(arg->sd, pipe_out, out, MSG_SIZE);
                if (ret < 0) {
                        ret = -ret;
                        UNIMPLEMENTED(__DUMP__);
                }
#endif

                YASSERT(ret == MSG_SIZE);
#ifndef USE_SPLICE
                ret = recv(arg->sd, out, MSG_SIZE, 0);
                if (ret < 0) {
                        ret = errno;
                        UNIMPLEMENTED(__DUMP__);
                }
#else
                ret = __splice_in(arg->sd, pipe_out, out, MSG_SIZE);
                if (ret < 0) {
                        ret = -ret;
                        UNIMPLEMENTED(__DUMP__);
                }
#endif

                YASSERT(ret == MSG_SIZE);

                arg->io++;
        }

        return NULL;
}

static void __prof_vm_cli_print(prof_net_arg_t *args, int threads, struct timeval *begin)
{
        int i;
        uint64_t io, used;
        struct timeval now;

        io = 0;
        for (i = 0; i < threads; i++) {
                io += args[i].io;
        }

        _gettimeofday(&now, NULL);

        used = _time_used(begin, &now);

        DBUG("io %u, used %u\n", (int)io, (int)used);
        printf("iops : %llu\n", (long long)io * 1000 * 1000 /  used);
}


int prof_vm(const char *srv, int threads, int runtime)
{
        int ret, i;
        net_handle_t nh;
        struct timeval begin;
        prof_net_arg_t args[1024], *arg;
        pthread_t th;
        pthread_attr_t ta;

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);

        _gettimeofday(&begin, NULL);

        char port[MAX_NAME_LEN];

        snprintf(port, MAX_NAME_LEN, "%d", __prof_vm_port__);

        __running__ = 1;

        for (i = 0; i < threads; i++) {
                arg = &args[i];
                arg->io = 0;

                ret = tcp_sock_hostconnect(&nh, srv, port, 0, 10, 0);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                arg->sd = nh.u.sd.sd;

                ret = tcp_sock_tuning(arg->sd, 1, 0);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = pthread_create(&th, &ta, __prof_vm_cli_worker, arg);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        //end = gettime();
        i = 0;
        while (i < runtime) {
                sleep(1);
                i++;
                __prof_vm_cli_print(args, threads, &begin);
        }

        __running__ = 0;

        return 0;
err_ret:
        return ret;
}
